// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

//! Filter Push Down optimizer rule ensures that filters are applied as early as possible in the plan

use crate::datasource::datasource::TableProviderFilterPushDown;
use crate::execution::context::ExecutionProps;
use crate::logical_plan::{and, LogicalPlan};
use crate::logical_plan::{DFSchema, Expr};
use crate::optimizer::optimizer::OptimizerRule;
use crate::optimizer::utils;
use crate::{error::Result, logical_plan::Operator};
use std::{
    collections::{HashMap, HashSet},
    sync::Arc,
};

/// Filter Push Down optimizer rule pushes filter clauses down the plan
/// # Introduction
/// A filter-commutative operation is an operation whose result of filter(op(data)) = op(filter(data)).
/// An example of a filter-commutative operation is a projection; a counter-example is `limit`.
///
/// The filter-commutative property is column-specific. An aggregate grouped by A on SUM(B)
/// can commute with a filter that depends on A only, but does not commute with a filter that depends
/// on SUM(B).
///
/// This optimizer commutes filters with filter-commutative operations to push the filters
/// the closest possible to the scans, re-writing the filter expressions by every
/// projection that changes the filter's expression.
///
/// Filter: #b Gt Int64(10)
///     Projection: #a AS b
///
/// is optimized to
///
/// Projection: #a AS b
///     Filter: #a Gt Int64(10)  <--- changed from #b to #a
///
/// This performs a single pass trought the plan. When it passes trought a filter, it stores that filter,
/// and when it reaches a node that does not commute with it, it adds the filter to that place.
/// When it passes through a projection, it re-writes the filter's expression taking into accoun that projection.
/// When multiple filters would have been written, it `AND` their expressions into a single expression.
pub struct FilterPushDown {}

#[derive(Debug, Clone, Default)]
struct State {
    // (predicate, columns on the predicate)
    filters: Vec<(Expr, HashSet<String>)>,
}

type Predicates<'a> = (Vec<&'a Expr>, Vec<&'a HashSet<String>>);

/// returns all predicates in `state` that depend on any of `used_columns`
fn get_predicates<'a>(
    state: &'a State,
    used_columns: &HashSet<String>,
) -> Predicates<'a> {
    state
        .filters
        .iter()
        .filter(|(_, columns)| {
            !columns
                .intersection(used_columns)
                .collect::<HashSet<_>>()
                .is_empty()
        })
        .map(|&(ref a, ref b)| (a, b))
        .unzip()
}

// returns 3 (potentially overlaping) sets of predicates:
// * pushable to left: its columns are all on the left
// * pushable to right: its columns is all on the right
// * keep: the set of columns is not in only either left or right
// Note that a predicate can be both pushed to the left and to the right.
fn get_join_predicates<'a>(
    state: &'a State,
    left: &DFSchema,
    right: &DFSchema,
) -> (
    Vec<&'a HashSet<String>>,
    Vec<&'a HashSet<String>>,
    Predicates<'a>,
) {
    let left_columns = &left
        .fields()
        .iter()
        .map(|f| f.name().clone())
        .collect::<HashSet<_>>();
    let right_columns = &right
        .fields()
        .iter()
        .map(|f| f.name().clone())
        .collect::<HashSet<_>>();

    let filters = state
        .filters
        .iter()
        .map(|(predicate, columns)| {
            (
                (predicate, columns),
                (
                    columns,
                    left_columns.intersection(columns).collect::<HashSet<_>>(),
                    right_columns.intersection(columns).collect::<HashSet<_>>(),
                ),
            )
        })
        .collect::<Vec<_>>();

    let pushable_to_left = filters
        .iter()
        .filter(|(_, (columns, left, _))| left.len() == columns.len())
        .map(|((_, b), _)| *b)
        .collect();
    let pushable_to_right = filters
        .iter()
        .filter(|(_, (columns, _, right))| right.len() == columns.len())
        .map(|((_, b), _)| *b)
        .collect();
    let keep = filters
        .iter()
        .filter(|(_, (columns, left, right))| {
            // predicates whose columns are not in only one side of the join need to remain
            let all_in_left = left.len() == columns.len();
            let all_in_right = right.len() == columns.len();
            !all_in_left && !all_in_right
        })
        .map(|((ref a, ref b), _)| (a, b))
        .unzip();
    (pushable_to_left, pushable_to_right, keep)
}

/// Optimizes the plan
fn push_down(state: &State, plan: &LogicalPlan) -> Result<LogicalPlan> {
    let new_inputs = plan
        .inputs()
        .iter()
        .map(|input| optimize(input, state.clone()))
        .collect::<Result<Vec<_>>>()?;

    let expr = plan.expressions();
    utils::from_plan(&plan, &expr, &new_inputs)
}

/// returns a new [LogicalPlan] that wraps `plan` in a [LogicalPlan::Filter] with
/// its predicate be all `predicates` ANDed.
fn add_filter(plan: LogicalPlan, predicates: &[&Expr]) -> LogicalPlan {
    // reduce filters to a single filter with an AND
    let predicate = predicates
        .iter()
        .skip(1)
        .fold(predicates[0].clone(), |acc, predicate| {
            and(acc, (*predicate).to_owned())
        });

    LogicalPlan::Filter {
        predicate,
        input: Arc::new(plan),
    }
}

// remove all filters from `filters` that are in `predicate_columns`
fn remove_filters(
    filters: &[(Expr, HashSet<String>)],
    predicate_columns: &[&HashSet<String>],
) -> Vec<(Expr, HashSet<String>)> {
    filters
        .iter()
        .filter(|(_, columns)| !predicate_columns.contains(&columns))
        .cloned()
        .collect::<Vec<_>>()
}

// keeps all filters from `filters` that are in `predicate_columns`
fn keep_filters(
    filters: &[(Expr, HashSet<String>)],
    predicate_columns: &[&HashSet<String>],
) -> Vec<(Expr, HashSet<String>)> {
    filters
        .iter()
        .filter(|(_, columns)| predicate_columns.contains(&columns))
        .cloned()
        .collect::<Vec<_>>()
}

/// builds a new [LogicalPlan] from `plan` by issuing new [LogicalPlan::Filter] if any of the filters
/// in `state` depend on the columns `used_columns`.
fn issue_filters(
    mut state: State,
    used_columns: HashSet<String>,
    plan: &LogicalPlan,
) -> Result<LogicalPlan> {
    let (predicates, predicate_columns) = get_predicates(&state, &used_columns);

    if predicates.is_empty() {
        // all filters can be pushed down => optimize inputs and return new plan
        return push_down(&state, plan);
    }

    let plan = add_filter(plan.clone(), &predicates);

    state.filters = remove_filters(&state.filters, &predicate_columns);

    // continue optimization over all input nodes by cloning the current state (i.e. each node is independent)
    push_down(&state, &plan)
}

/// converts "A AND B AND C" => [A, B, C]
fn split_members<'a>(predicate: &'a Expr, predicates: &mut Vec<&'a Expr>) {
    match predicate {
        Expr::BinaryExpr {
            right,
            op: Operator::And,
            left,
        } => {
            split_members(&left, predicates);
            split_members(&right, predicates);
        }
        other => predicates.push(other),
    }
}

fn optimize(plan: &LogicalPlan, mut state: State) -> Result<LogicalPlan> {
    match plan {
        LogicalPlan::Explain { .. } => {
            // push the optimization to the plan of this explain
            push_down(&state, plan)
        }
        LogicalPlan::Filter { input, predicate } => {
            let mut predicates = vec![];
            split_members(predicate, &mut predicates);

            // Predicates without referencing columns (WHERE FALSE, WHERE 1=1, etc.)
            let mut no_col_predicates = vec![];

            predicates
                .into_iter()
                .try_for_each::<_, Result<()>>(|predicate| {
                    let mut columns: HashSet<String> = HashSet::new();
                    utils::expr_to_column_names(predicate, &mut columns)?;
                    if columns.is_empty() {
                        no_col_predicates.push(predicate)
                    } else {
                        // collect the predicate
                        state.filters.push((predicate.clone(), columns));
                    }
                    Ok(())
                })?;
            // Predicates without columns will not be pushed down.
            // As those contain only literals, they could be optimized using constant folding
            // and removal of WHERE TRUE / WHERE FALSE
            if !no_col_predicates.is_empty() {
                Ok(add_filter(optimize(input, state)?, &no_col_predicates))
            } else {
                optimize(input, state)
            }
        }
        LogicalPlan::Projection {
            input,
            expr,
            schema,
        } => {
            // A projection is filter-commutable, but re-writes all predicate expressions
            // collect projection.
            let mut projection = HashMap::new();
            schema.fields().iter().enumerate().for_each(|(i, field)| {
                // strip alias, as they should not be part of filters
                let expr = match &expr[i] {
                    Expr::Alias(expr, _) => expr.as_ref().clone(),
                    expr => expr.clone(),
                };

                projection.insert(field.name().clone(), expr);
            });

            // re-write all filters based on this projection
            // E.g. in `Filter: #b\n  Projection: #a > 1 as b`, we can swap them, but the filter must be "#a > 1"
            for (predicate, columns) in state.filters.iter_mut() {
                *predicate = rewrite(predicate, &projection)?;

                columns.clear();
                utils::expr_to_column_names(predicate, columns)?;
            }

            // optimize inner
            let new_input = optimize(input, state)?;

            utils::from_plan(&plan, &expr, &[new_input])
        }
        LogicalPlan::Aggregate {
            input, aggr_expr, ..
        } => {
            // An aggregate's aggreagate columns are _not_ filter-commutable => collect these:
            // * columns whose aggregation expression depends on
            // * the aggregation columns themselves

            // construct set of columns that `aggr_expr` depends on
            let mut used_columns = HashSet::new();
            utils::exprlist_to_column_names(aggr_expr, &mut used_columns)?;

            let agg_columns = aggr_expr
                .iter()
                .map(|x| x.name(input.schema()))
                .collect::<Result<HashSet<_>>>()?;
            used_columns.extend(agg_columns);

            issue_filters(state, used_columns, plan)
        }
        LogicalPlan::Sort { .. } => {
            // sort is filter-commutable
            push_down(&state, plan)
        }
        LogicalPlan::Limit { input, .. } => {
            // limit is _not_ filter-commutable => collect all columns from its input
            let used_columns = input
                .schema()
                .fields()
                .iter()
                .map(|f| f.name().clone())
                .collect::<HashSet<_>>();
            issue_filters(state, used_columns, plan)
        }
        LogicalPlan::Join { left, right, .. }
        | LogicalPlan::CrossJoin { left, right, .. } => {
            let (pushable_to_left, pushable_to_right, keep) =
                get_join_predicates(&state, &left.schema(), &right.schema());

            let mut left_state = state.clone();
            left_state.filters = keep_filters(&left_state.filters, &pushable_to_left);
            let left = optimize(left, left_state)?;

            let mut right_state = state.clone();
            right_state.filters = keep_filters(&right_state.filters, &pushable_to_right);
            let right = optimize(right, right_state)?;

            // create a new Join with the new `left` and `right`
            let expr = plan.expressions();
            let plan = utils::from_plan(&plan, &expr, &[left, right])?;

            if keep.0.is_empty() {
                Ok(plan)
            } else {
                // wrap the join on the filter whose predicates must be kept
                let plan = add_filter(plan, &keep.0);
                state.filters = remove_filters(&state.filters, &keep.1);

                Ok(plan)
            }
        }
        LogicalPlan::TableScan {
            source,
            projected_schema,
            filters,
            projection,
            table_name,
            limit,
        } => {
            let mut used_columns = HashSet::new();
            let mut new_filters = filters.clone();

            for (filter_expr, cols) in &state.filters {
                let (preserve_filter_node, add_to_provider) =
                    match source.supports_filter_pushdown(filter_expr)? {
                        TableProviderFilterPushDown::Unsupported => (true, false),
                        TableProviderFilterPushDown::Inexact => (true, true),
                        TableProviderFilterPushDown::Exact => (false, true),
                    };

                if preserve_filter_node {
                    used_columns.extend(cols.clone());
                }

                if add_to_provider {
                    // Don't add expression again if it's already present in
                    // pushed down filters.
                    if new_filters.contains(filter_expr) {
                        break;
                    }
                    new_filters.push(filter_expr.clone());
                }
            }

            issue_filters(
                state,
                used_columns,
                &LogicalPlan::TableScan {
                    source: source.clone(),
                    projection: projection.clone(),
                    projected_schema: projected_schema.clone(),
                    table_name: table_name.clone(),
                    filters: new_filters,
                    limit: *limit,
                },
            )
        }
        _ => {
            // all other plans are _not_ filter-commutable
            let used_columns = plan
                .schema()
                .fields()
                .iter()
                .map(|f| f.name().clone())
                .collect::<HashSet<_>>();
            issue_filters(state, used_columns, plan)
        }
    }
}

impl OptimizerRule for FilterPushDown {
    fn name(&self) -> &str {
        "filter_push_down"
    }

    fn optimize(&self, plan: &LogicalPlan, _: &ExecutionProps) -> Result<LogicalPlan> {
        optimize(plan, State::default())
    }
}

impl FilterPushDown {
    #[allow(missing_docs)]
    pub fn new() -> Self {
        Self {}
    }
}

/// replaces columns by its name on the projection.
fn rewrite(expr: &Expr, projection: &HashMap<String, Expr>) -> Result<Expr> {
    let expressions = utils::expr_sub_expressions(&expr)?;

    let expressions = expressions
        .iter()
        .map(|e| rewrite(e, &projection))
        .collect::<Result<Vec<_>>>()?;

    if let Expr::Column(name) = expr {
        if let Some(expr) = projection.get(name) {
            return Ok(expr.clone());
        }
    }

    utils::rewrite_expression(&expr, &expressions)
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::datasource::datasource::Statistics;
    use crate::datasource::TableProvider;
    use crate::logical_plan::{lit, sum, DFSchema, Expr, LogicalPlanBuilder, Operator};
    use crate::physical_plan::ExecutionPlan;
    use crate::test::*;
    use crate::{logical_plan::col, prelude::JoinType};
    use arrow::datatypes::SchemaRef;

    fn optimize_plan(plan: &LogicalPlan) -> LogicalPlan {
        let rule = FilterPushDown::new();
        rule.optimize(plan, &ExecutionProps::new())
            .expect("failed to optimize plan")
    }

    fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) {
        let optimized_plan = optimize_plan(plan);
        let formatted_plan = format!("{:?}", optimized_plan);
        assert_eq!(formatted_plan, expected);
    }

    #[test]
    fn filter_before_projection() -> Result<()> {
        let table_scan = test_table_scan()?;
        let plan = LogicalPlanBuilder::from(&table_scan)
            .project(vec![col("a"), col("b")])?
            .filter(col("a").eq(lit(1i64)))?
            .build()?;
        // filter is before projection
        let expected = "\
            Projection: #a, #b\
            \n  Filter: #a Eq Int64(1)\
            \n    TableScan: test projection=None";
        assert_optimized_plan_eq(&plan, expected);
        Ok(())
    }

    #[test]
    fn filter_after_limit() -> Result<()> {
        let table_scan = test_table_scan()?;
        let plan = LogicalPlanBuilder::from(&table_scan)
            .project(vec![col("a"), col("b")])?
            .limit(10)?
            .filter(col("a").eq(lit(1i64)))?
            .build()?;
        // filter is before single projection
        let expected = "\
            Filter: #a Eq Int64(1)\
            \n  Limit: 10\
            \n    Projection: #a, #b\
            \n      TableScan: test projection=None";
        assert_optimized_plan_eq(&plan, expected);
        Ok(())
    }

    #[test]
    fn filter_no_columns() -> Result<()> {
        let table_scan = test_table_scan()?;
        let plan = LogicalPlanBuilder::from(&table_scan)
            .filter(lit(0i64).eq(lit(1i64)))?
            .build()?;
        let expected = "\
            Filter: Int64(0) Eq Int64(1)\
            \n  TableScan: test projection=None";
        assert_optimized_plan_eq(&plan, expected);
        Ok(())
    }

    #[test]
    fn filter_jump_2_plans() -> Result<()> {
        let table_scan = test_table_scan()?;
        let plan = LogicalPlanBuilder::from(&table_scan)
            .project(vec![col("a"), col("b"), col("c")])?
            .project(vec![col("c"), col("b")])?
            .filter(col("a").eq(lit(1i64)))?
            .build()?;
        // filter is before double projection
        let expected = "\
            Projection: #c, #b\
            \n  Projection: #a, #b, #c\
            \n    Filter: #a Eq Int64(1)\
            \n      TableScan: test projection=None";
        assert_optimized_plan_eq(&plan, expected);
        Ok(())
    }

    #[test]
    fn filter_move_agg() -> Result<()> {
        let table_scan = test_table_scan()?;
        let plan = LogicalPlanBuilder::from(&table_scan)
            .aggregate(vec![col("a")], vec![sum(col("b")).alias("total_salary")])?
            .filter(col("a").gt(lit(10i64)))?
            .build()?;
        // filter of key aggregation is commutative
        let expected = "\
            Aggregate: groupBy=[[#a]], aggr=[[SUM(#b) AS total_salary]]\
            \n  Filter: #a Gt Int64(10)\
            \n    TableScan: test projection=None";
        assert_optimized_plan_eq(&plan, expected);
        Ok(())
    }

    #[test]
    fn filter_keep_agg() -> Result<()> {
        let table_scan = test_table_scan()?;
        let plan = LogicalPlanBuilder::from(&table_scan)
            .aggregate(vec![col("a")], vec![sum(col("b")).alias("b")])?
            .filter(col("b").gt(lit(10i64)))?
            .build()?;
        // filter of aggregate is after aggregation since they are non-commutative
        let expected = "\
            Filter: #b Gt Int64(10)\
            \n  Aggregate: groupBy=[[#a]], aggr=[[SUM(#b) AS b]]\
            \n    TableScan: test projection=None";
        assert_optimized_plan_eq(&plan, expected);
        Ok(())
    }

    /// verifies that a filter is pushed to before a projection, the filter expression is correctly re-written
    #[test]
    fn alias() -> Result<()> {
        let table_scan = test_table_scan()?;
        let plan = LogicalPlanBuilder::from(&table_scan)
            .project(vec![col("a").alias("b"), col("c")])?
            .filter(col("b").eq(lit(1i64)))?
            .build()?;
        // filter is before projection
        let expected = "\
            Projection: #a AS b, #c\
            \n  Filter: #a Eq Int64(1)\
            \n    TableScan: test projection=None";
        assert_optimized_plan_eq(&plan, expected);
        Ok(())
    }

    fn add(left: Expr, right: Expr) -> Expr {
        Expr::BinaryExpr {
            left: Box::new(left),
            op: Operator::Plus,
            right: Box::new(right),
        }
    }

    fn multiply(left: Expr, right: Expr) -> Expr {
        Expr::BinaryExpr {
            left: Box::new(left),
            op: Operator::Multiply,
            right: Box::new(right),
        }
    }

    /// verifies that a filter is pushed to before a projection with a complex expression, the filter expression is correctly re-written
    #[test]
    fn complex_expression() -> Result<()> {
        let table_scan = test_table_scan()?;
        let plan = LogicalPlanBuilder::from(&table_scan)
            .project(vec![
                add(multiply(col("a"), lit(2)), col("c")).alias("b"),
                col("c"),
            ])?
            .filter(col("b").eq(lit(1i64)))?
            .build()?;

        // not part of the test, just good to know:
        assert_eq!(
            format!("{:?}", plan),
            "\
            Filter: #b Eq Int64(1)\
            \n  Projection: #a Multiply Int32(2) Plus #c AS b, #c\
            \n    TableScan: test projection=None"
        );

        // filter is before projection
        let expected = "\
            Projection: #a Multiply Int32(2) Plus #c AS b, #c\
            \n  Filter: #a Multiply Int32(2) Plus #c Eq Int64(1)\
            \n    TableScan: test projection=None";
        assert_optimized_plan_eq(&plan, expected);
        Ok(())
    }

    /// verifies that when a filter is pushed to after 2 projections, the filter expression is correctly re-written
    #[test]
    fn complex_plan() -> Result<()> {
        let table_scan = test_table_scan()?;
        let plan = LogicalPlanBuilder::from(&table_scan)
            .project(vec![
                add(multiply(col("a"), lit(2)), col("c")).alias("b"),
                col("c"),
            ])?
            // second projection where we rename columns, just to make it difficult
            .project(vec![multiply(col("b"), lit(3)).alias("a"), col("c")])?
            .filter(col("a").eq(lit(1i64)))?
            .build()?;

        // not part of the test, just good to know:
        assert_eq!(
            format!("{:?}", plan),
            "\
            Filter: #a Eq Int64(1)\
            \n  Projection: #b Multiply Int32(3) AS a, #c\
            \n    Projection: #a Multiply Int32(2) Plus #c AS b, #c\
            \n      TableScan: test projection=None"
        );

        // filter is before the projections
        let expected = "\
        Projection: #b Multiply Int32(3) AS a, #c\
        \n  Projection: #a Multiply Int32(2) Plus #c AS b, #c\
        \n    Filter: #a Multiply Int32(2) Plus #c Multiply Int32(3) Eq Int64(1)\
        \n      TableScan: test projection=None";
        assert_optimized_plan_eq(&plan, expected);
        Ok(())
    }

    /// verifies that when two filters apply after an aggregation that only allows one to be pushed, one is pushed
    /// and the other not.
    #[test]
    fn multi_filter() -> Result<()> {
        // the aggregation allows one filter to pass (b), and the other one to not pass (SUM(c))
        let table_scan = test_table_scan()?;
        let plan = LogicalPlanBuilder::from(&table_scan)
            .project(vec![col("a").alias("b"), col("c")])?
            .aggregate(vec![col("b")], vec![sum(col("c"))])?
            .filter(col("b").gt(lit(10i64)))?
            .filter(col("SUM(c)").gt(lit(10i64)))?
            .build()?;

        // not part of the test, just good to know:
        assert_eq!(
            format!("{:?}", plan),
            "\
            Filter: #SUM(c) Gt Int64(10)\
            \n  Filter: #b Gt Int64(10)\
            \n    Aggregate: groupBy=[[#b]], aggr=[[SUM(#c)]]\
            \n      Projection: #a AS b, #c\
            \n        TableScan: test projection=None"
        );

        // filter is before the projections
        let expected = "\
        Filter: #SUM(c) Gt Int64(10)\
        \n  Aggregate: groupBy=[[#b]], aggr=[[SUM(#c)]]\
        \n    Projection: #a AS b, #c\
        \n      Filter: #a Gt Int64(10)\
        \n        TableScan: test projection=None";
        assert_optimized_plan_eq(&plan, expected);

        Ok(())
    }

    /// verifies that when a filter with two predicates is applied after an aggregation that only allows one to be pushed, one is pushed
    /// and the other not.
    #[test]
    fn split_filter() -> Result<()> {
        // the aggregation allows one filter to pass (b), and the other one to not pass (SUM(c))
        let table_scan = test_table_scan()?;
        let plan = LogicalPlanBuilder::from(&table_scan)
            .project(vec![col("a").alias("b"), col("c")])?
            .aggregate(vec![col("b")], vec![sum(col("c"))])?
            .filter(and(
                col("SUM(c)").gt(lit(10i64)),
                and(col("b").gt(lit(10i64)), col("SUM(c)").lt(lit(20i64))),
            ))?
            .build()?;

        // not part of the test, just good to know:
        assert_eq!(
            format!("{:?}", plan),
            "\
            Filter: #SUM(c) Gt Int64(10) And #b Gt Int64(10) And #SUM(c) Lt Int64(20)\
            \n  Aggregate: groupBy=[[#b]], aggr=[[SUM(#c)]]\
            \n    Projection: #a AS b, #c\
            \n      TableScan: test projection=None"
        );

        // filter is before the projections
        let expected = "\
        Filter: #SUM(c) Gt Int64(10) And #SUM(c) Lt Int64(20)\
        \n  Aggregate: groupBy=[[#b]], aggr=[[SUM(#c)]]\
        \n    Projection: #a AS b, #c\
        \n      Filter: #a Gt Int64(10)\
        \n        TableScan: test projection=None";
        assert_optimized_plan_eq(&plan, expected);

        Ok(())
    }

    /// verifies that when two limits are in place, we jump neither
    #[test]
    fn double_limit() -> Result<()> {
        let table_scan = test_table_scan()?;
        let plan = LogicalPlanBuilder::from(&table_scan)
            .project(vec![col("a"), col("b")])?
            .limit(20)?
            .limit(10)?
            .project(vec![col("a"), col("b")])?
            .filter(col("a").eq(lit(1i64)))?
            .build()?;
        // filter does not just any of the limits
        let expected = "\
            Projection: #a, #b\
            \n  Filter: #a Eq Int64(1)\
            \n    Limit: 10\
            \n      Limit: 20\
            \n        Projection: #a, #b\
            \n          TableScan: test projection=None";
        assert_optimized_plan_eq(&plan, expected);
        Ok(())
    }

    /// verifies that filters with the same columns are correctly placed
    #[test]
    fn filter_2_breaks_limits() -> Result<()> {
        let table_scan = test_table_scan()?;
        let plan = LogicalPlanBuilder::from(&table_scan)
            .project(vec![col("a")])?
            .filter(col("a").lt_eq(lit(1i64)))?
            .limit(1)?
            .project(vec![col("a")])?
            .filter(col("a").gt_eq(lit(1i64)))?
            .build()?;
        // Should be able to move both filters below the projections

        // not part of the test
        assert_eq!(
            format!("{:?}", plan),
            "Filter: #a GtEq Int64(1)\
             \n  Projection: #a\
             \n    Limit: 1\
             \n      Filter: #a LtEq Int64(1)\
             \n        Projection: #a\
             \n          TableScan: test projection=None"
        );

        let expected = "\
        Projection: #a\
        \n  Filter: #a GtEq Int64(1)\
        \n    Limit: 1\
        \n      Projection: #a\
        \n        Filter: #a LtEq Int64(1)\
        \n          TableScan: test projection=None";

        assert_optimized_plan_eq(&plan, expected);
        Ok(())
    }

    /// verifies that filters to be placed on the same depth are ANDed
    #[test]
    fn two_filters_on_same_depth() -> Result<()> {
        let table_scan = test_table_scan()?;
        let plan = LogicalPlanBuilder::from(&table_scan)
            .limit(1)?
            .filter(col("a").lt_eq(lit(1i64)))?
            .filter(col("a").gt_eq(lit(1i64)))?
            .project(vec![col("a")])?
            .build()?;

        // not part of the test
        assert_eq!(
            format!("{:?}", plan),
            "Projection: #a\
            \n  Filter: #a GtEq Int64(1)\
            \n    Filter: #a LtEq Int64(1)\
            \n      Limit: 1\
            \n        TableScan: test projection=None"
        );

        let expected = "\
        Projection: #a\
        \n  Filter: #a GtEq Int64(1) And #a LtEq Int64(1)\
        \n    Limit: 1\
        \n      TableScan: test projection=None";

        assert_optimized_plan_eq(&plan, expected);
        Ok(())
    }

    /// verifies that filters on a plan with user nodes are not lost
    /// (ARROW-10547)
    #[test]
    fn filters_user_defined_node() -> Result<()> {
        let table_scan = test_table_scan()?;
        let plan = LogicalPlanBuilder::from(&table_scan)
            .filter(col("a").lt_eq(lit(1i64)))?
            .build()?;

        let plan = crate::test::user_defined::new(plan);

        let expected = "\
            TestUserDefined\
             \n  Filter: #a LtEq Int64(1)\
             \n    TableScan: test projection=None";

        // not part of the test
        assert_eq!(format!("{:?}", plan), expected);

        assert_optimized_plan_eq(&plan, expected);
        Ok(())
    }

    /// post-join predicates on a column common to both sides is pushed to both sides
    #[test]
    fn filter_join_on_common_independent() -> Result<()> {
        let table_scan = test_table_scan()?;
        let left = LogicalPlanBuilder::from(&table_scan).build()?;
        let right = LogicalPlanBuilder::from(&table_scan)
            .project(vec![col("a")])?
            .build()?;
        let plan = LogicalPlanBuilder::from(&left)
            .join(&right, JoinType::Inner, &["a"], &["a"])?
            .filter(col("a").lt_eq(lit(1i64)))?
            .build()?;

        // not part of the test, just good to know:
        assert_eq!(
            format!("{:?}", plan),
            "\
            Filter: #a LtEq Int64(1)\
            \n  Join: a = a\
            \n    TableScan: test projection=None\
            \n    Projection: #a\
            \n      TableScan: test projection=None"
        );

        // filter sent to side before the join
        let expected = "\
        Join: a = a\
        \n  Filter: #a LtEq Int64(1)\
        \n    TableScan: test projection=None\
        \n  Projection: #a\
        \n    Filter: #a LtEq Int64(1)\
        \n      TableScan: test projection=None";
        assert_optimized_plan_eq(&plan, expected);
        Ok(())
    }

    /// post-join predicates with columns from both sides are not pushed
    #[test]
    fn filter_join_on_common_dependent() -> Result<()> {
        let table_scan = test_table_scan()?;
        let left = LogicalPlanBuilder::from(&table_scan)
            .project(vec![col("a"), col("c")])?
            .build()?;
        let right = LogicalPlanBuilder::from(&table_scan)
            .project(vec![col("a"), col("b")])?
            .build()?;
        let plan = LogicalPlanBuilder::from(&left)
            .join(&right, JoinType::Inner, &["a"], &["a"])?
            // "b" and "c" are not shared by either side: they are only available together after the join
            .filter(col("c").lt_eq(col("b")))?
            .build()?;

        // not part of the test, just good to know:
        assert_eq!(
            format!("{:?}", plan),
            "\
            Filter: #c LtEq #b\
            \n  Join: a = a\
            \n    Projection: #a, #c\
            \n      TableScan: test projection=None\
            \n    Projection: #a, #b\
            \n      TableScan: test projection=None"
        );

        // expected is equal: no push-down
        let expected = &format!("{:?}", plan);
        assert_optimized_plan_eq(&plan, expected);
        Ok(())
    }

    /// post-join predicates with columns from one side of a join are pushed only to that side
    #[test]
    fn filter_join_on_one_side() -> Result<()> {
        let table_scan = test_table_scan()?;
        let left = LogicalPlanBuilder::from(&table_scan)
            .project(vec![col("a"), col("b")])?
            .build()?;
        let right = LogicalPlanBuilder::from(&table_scan)
            .project(vec![col("a"), col("c")])?
            .build()?;
        let plan = LogicalPlanBuilder::from(&left)
            .join(&right, JoinType::Inner, &["a"], &["a"])?
            .filter(col("b").lt_eq(lit(1i64)))?
            .build()?;

        // not part of the test, just good to know:
        assert_eq!(
            format!("{:?}", plan),
            "\
            Filter: #b LtEq Int64(1)\
            \n  Join: a = a\
            \n    Projection: #a, #b\
            \n      TableScan: test projection=None\
            \n    Projection: #a, #c\
            \n      TableScan: test projection=None"
        );

        let expected = "\
        Join: a = a\
        \n  Projection: #a, #b\
        \n    Filter: #b LtEq Int64(1)\
        \n      TableScan: test projection=None\
        \n  Projection: #a, #c\
        \n    TableScan: test projection=None";
        assert_optimized_plan_eq(&plan, expected);
        Ok(())
    }

    struct PushDownProvider {
        pub filter_support: TableProviderFilterPushDown,
    }

    impl TableProvider for PushDownProvider {
        fn schema(&self) -> SchemaRef {
            Arc::new(arrow::datatypes::Schema::new(vec![
                arrow::datatypes::Field::new(
                    "a",
                    arrow::datatypes::DataType::Int32,
                    true,
                ),
            ]))
        }

        fn scan(
            &self,
            _: &Option<Vec<usize>>,
            _: usize,
            _: &[Expr],
            _: Option<usize>,
        ) -> Result<Arc<dyn ExecutionPlan>> {
            unimplemented!()
        }

        fn supports_filter_pushdown(
            &self,
            _: &Expr,
        ) -> Result<TableProviderFilterPushDown> {
            Ok(self.filter_support.clone())
        }

        fn as_any(&self) -> &dyn std::any::Any {
            self
        }

        fn statistics(&self) -> Statistics {
            Statistics::default()
        }
    }

    fn table_scan_with_pushdown_provider(
        filter_support: TableProviderFilterPushDown,
    ) -> Result<LogicalPlan> {
        let test_provider = PushDownProvider { filter_support };

        let table_scan = LogicalPlan::TableScan {
            table_name: "".into(),
            filters: vec![],
            projected_schema: Arc::new(DFSchema::try_from_qualified(
                "",
                &*test_provider.schema(),
            )?),
            projection: None,
            source: Arc::new(test_provider),
            limit: None,
        };

        LogicalPlanBuilder::from(&table_scan)
            .filter(col("a").eq(lit(1i64)))?
            .build()
    }

    #[test]
    fn filter_with_table_provider_exact() -> Result<()> {
        let plan = table_scan_with_pushdown_provider(TableProviderFilterPushDown::Exact)?;

        let expected = "\
        TableScan: projection=None, filters=[#a Eq Int64(1)]";
        assert_optimized_plan_eq(&plan, expected);
        Ok(())
    }

    #[test]
    fn filter_with_table_provider_inexact() -> Result<()> {
        let plan =
            table_scan_with_pushdown_provider(TableProviderFilterPushDown::Inexact)?;

        let expected = "\
        Filter: #a Eq Int64(1)\
        \n  TableScan: projection=None, filters=[#a Eq Int64(1)]";
        assert_optimized_plan_eq(&plan, expected);
        Ok(())
    }

    #[test]
    fn filter_with_table_provider_multiple_invocations() -> Result<()> {
        let plan =
            table_scan_with_pushdown_provider(TableProviderFilterPushDown::Inexact)?;

        let optimised_plan = optimize_plan(&plan);

        let expected = "\
        Filter: #a Eq Int64(1)\
        \n  TableScan: projection=None, filters=[#a Eq Int64(1)]";

        // Optimizing the same plan multiple times should produce the same plan
        // each time.
        assert_optimized_plan_eq(&optimised_plan, expected);
        Ok(())
    }

    #[test]
    fn filter_with_table_provider_unsupported() -> Result<()> {
        let plan =
            table_scan_with_pushdown_provider(TableProviderFilterPushDown::Unsupported)?;

        let expected = "\
        Filter: #a Eq Int64(1)\
        \n  TableScan: projection=None";
        assert_optimized_plan_eq(&plan, expected);
        Ok(())
    }
}
